package com.atguigu.day06;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

public class Flink11_Timer_Exec {
    public static void main(String[] args) throws Exception {
//1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.从端口读取数据
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);


        //3.将数据转为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDStream = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                String[] split = value.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        });

        //4.将相同key的数据聚和到一块
        KeyedStream<WaterSensor, Tuple> keyedStream = waterSensorDStream.keyBy("id");

        //5.处理需求
        /**
         * 监控水位传感器的水位值，如果水位值在五秒钟之内连续上升，则报警，并将报警信息输出到侧输出流。
         *  每一个5秒内第一数据来的时候先注册一个5S定时器，如果下一次水位没有上升，则删除定时器，如果上升的话则什么也不做，等定时器报警。
         *  如何判断定时器有没有注册(把定时时间保存起来),如果定时器删除的话则要重置定时器时间，为了下次注册定时器
         *  如何判断水位是否上升(可以把上一次水位保存起来)
         */

        SingleOutputStreamOperator<WaterSensor> process = keyedStream.process(new KeyedProcessFunction<Tuple, WaterSensor, WaterSensor>() {
            //用来保存上一次水位
            private Integer lastVc = Integer.MIN_VALUE;
            //用来保存定时器时间
            private Long timer = Long.MIN_VALUE;

            @Override
            public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
                //1.判断水位是否上升
                if (value.getVc() > lastVc) {
                    //2.水位上升，则判断定时器是否被注册，如果没有注册则注册定时器
                    if (timer == Long.MIN_VALUE) {
                        //定时器没有注册
                        //3.注册定时器
                        timer = ctx.timerService().currentProcessingTime() + 5000;
                        System.out.println("注册定时器:" + ctx.timerService().currentProcessingTime());
                        ctx.timerService().registerProcessingTimeTimer(timer);
                    }
                } else {
                    //4.水位没有上升
                    //5.判断之前有没有注册定时器
                    if (timer == Long.MIN_VALUE) {

                    } else {
                        //6.取消之前注册的定时器
                        System.out.println("删除定时器");
                        ctx.timerService().deleteProcessingTimeTimer(timer);
                        //7.为了方便下一次水位上升时注册，需要重置定时器时间
                        timer = Long.MIN_VALUE;
                    }
                }

                //8.无论如何，都要将本次的水位保存起来，以供下一次水位来的时候做对比
                lastVc = value.getVc();
                out.collect(value);
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<WaterSensor> out) throws Exception {
                //9.如果报警则在侧输出中发出报警信息
                ctx.output(new OutputTag<String>("output") {
                }, "警报！！！！" + ctx.getCurrentKey() + "水位连续5s上升");
                //10.已经报警则重置定时器时间，一遍后面数据注册使用
                timer = Long.MIN_VALUE;

            }
        });

        process.print("主流");

        process.getSideOutput(new OutputTag<String>("output") {
        }).print("报警");

        env.execute();
    }
}
